package com.xiaoyushu.count.biz.consumer;

import com.github.phantomthief.collection.BufferTrigger;
import com.google.common.collect.Maps;
import com.xiaoyushu.count.biz.constant.MQConstants;
import com.xiaoyushu.count.biz.constant.RedisKeyContsnts;
import com.xiaoyushu.count.biz.enums.FollowUnfollowTypeEnum;
import com.xiaoyushu.count.biz.model.dto.CountFollowUnfollowMqDTO;
import com.xiaoyushu.framework.common.util.JsonUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * 计数:粉丝数
 * 使用了 快手的BufferTrigger 配合RocketMQ 实现流量聚合
 */
@Component
@RocketMQMessageListener(consumerGroup = "xiaoyushu_group_"+ MQConstants.TOPIC_COUNT_FANS,//Group组
    topic = MQConstants.TOPIC_COUNT_FANS //主题 Topic
 )
@Slf4j
public class CountFansConsumer implements RocketMQListener<String> { //TODO :Message


    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Resource
    private RedisTemplate<String,Object> redisTemplate;

    private
    BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
            .bufferSize(50000) // 缓存队列的最大容量
            .batchSize(1000)   // 一批次最多聚合 1000 条
            .linger(Duration.ofSeconds(1)) // 多久聚合一次
            .setConsumerEx(this::consumeMessage)
            .build();



    /**
     * @param body
     */
    @Override
    public void onMessage(String body) {
        log.info("## 消费到了 MQ [计数:粉丝数],{} ..."+body);
        //往 bufferTrigger 中添加元素
        bufferTrigger.enqueue(body);
    }

    private void consumeMessage(List<String> bodys){
        log.info("===> 聚合消息,size:{}",bodys.size());
        log.info("===> 聚合消息,{}", JsonUtils.toJsonString(bodys));

        //List<String> 转 List<CountFollowUnfollowMqDTO>
        List<CountFollowUnfollowMqDTO> countFollowUnfollowMqDTOS = bodys.stream()
                .map(body -> JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class)).toList();

        //按目标用户进行分组
        Map<Long, List<CountFollowUnfollowMqDTO>>  groupMap= countFollowUnfollowMqDTOS.stream()
                .collect(Collectors.groupingBy(CountFollowUnfollowMqDTO::getTargetUserId));

        //按 组汇总数据,统计出最终的计数
        //key 为目标用户ID,value为最终操作的计数

        HashMap<Long, Integer> countMap = Maps.newHashMap();
        for (Map.Entry<Long,List<CountFollowUnfollowMqDTO>> entry: groupMap.entrySet()){
            List<CountFollowUnfollowMqDTO> list = entry.getValue();
            // 最终的计数值,默认为 0
            int finalCount=0;
            for (CountFollowUnfollowMqDTO countFollowUnfollowMqDTO:list){
                //获取操作类型
                Integer type = countFollowUnfollowMqDTO.getType();
                //根据操作类型,获取对应 枚举
                FollowUnfollowTypeEnum followUnfollowTypeEnum = FollowUnfollowTypeEnum.valueOf(type);
                // 若枚举为空,跳到下一次循环
                if (Objects.isNull(followUnfollowTypeEnum)) continue;

                switch(followUnfollowTypeEnum){
                    case FOLLOW -> finalCount+=1;//如果为关注操作,粉丝数+1
                    case UNFOLLOW -> finalCount-=1;//如果为取关操作,粉丝数 -1
                }
            }
            //将分组后统计出的最终计数,存入countMap中
            countMap.put(entry.getKey(),finalCount);
        }
        log.info("## 聚合后的计数数据:{}", JsonUtils.toJsonString(countMap));
        //更新Redis
        countMap.forEach((k,v)->{
            // Redis Key
            String redisKey = RedisKeyContsnts.buildCountUserKey(k);
            //判断Redis 中的Hash是否存在
            Boolean isExisted = redisTemplate.hasKey(redisKey);
            //若存在才会更新
            // (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)\
            if (isExisted){
                //对目标用户 Hash 中的粉丝数字段进行计数操作
                redisTemplate.opsForHash().increment(redisKey,RedisKeyContsnts.FIELD_FANS_TOTAL,v);
            }
        });
        //发送MQ,计数数据落库
        // 构建消息体 DTO
      Message<String> message= MessageBuilder.withPayload(JsonUtils.toJsonString(countMap))
                .build();

      //异步发送 MQ 消息,提升接口响应速度
        rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS_2_DB, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("===> [计数服务:粉丝数入库] MQ 发送成功,SendResult:{}",sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("==> [计数服务:粉丝数入库] MQ 发送异常:",throwable);
            }
        });


    }
}
